-
Notifications
You must be signed in to change notification settings - Fork 14
Support user provided codecs #81
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| #[tokio::test] | ||
| #[ignore] | ||
| async fn custom_extension_codec() -> Result<(), Box<dyn std::error::Error>> { | ||
| #[derive(Clone)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test cannot yet be un-ignored, as the expected results are failing, but the custom codec propagation works.
| use std::sync::Arc; | ||
|
|
||
| #[tokio::test] | ||
| #[ignore] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test now works! 🚀
| type Error = DataFusionError; | ||
| pub fn proto_from_stage( | ||
| stage: &ExecutionStage, | ||
| codec: &dyn PhysicalExtensionCodec, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As now a codec is needed here, we cannot just implement this in terms of a simple TryFrom implementation
| /// An optional codec to assist in serializing and deserializing this stage | ||
| pub codec: Option<Arc<dyn PhysicalExtensionCodec>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before, this fields ends up being Some() in the head stage, but None in any other. As it's always None in the ArrowFlightEndpoint, there's no way we can use a custom user provided codec there.
robtandy
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is good; thank you @gabotechs !
NGA-TRAN
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another great work. Thanks Gabriel
| // Nobody will never want to retriever a user codec from a SessionStateBuilder | ||
| None | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The content of this file is awesome. New UserCodecTransport trait for setting and getting codec for existing DF struct. And comments showing how to use them
| .write() | ||
| .config_mut() | ||
| .set_extension(Arc::new(codec)); | ||
| add_user_codec(&mut ctx, Int64ListExecCodec); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice and simple
| combined_codec.push_arc(Arc::clone(user_codec)); | ||
| } | ||
|
|
||
| let child_stage_proto = proto_from_stage(child_stage, &combined_codec).map_err(|e| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Intuively this makes sense but I think I can only fully get the role of combined_codec when I start using it
This PR ships the ability to provide user-defined codecs for users, for that two main pieces of work are shipped:
DistributedCodecin both the head stage of the plan that gets executed locally and any other further stages that get executed remotely in an ArrowFlightEndpointAbout the second point:
Previously, the user defined codec was provided as an optional input to the
DistributedPhysicalOptimizerRule, which ergonomically made a lot of sense. However, theDistributedPhysicalOptimizerRulehas no way of propagating the user defined codec down to remote workers.In practice, users with custom codecs need to inject it twice:
DistributedPhysicalOptimizerRulethat holds a reference to the codec and injects it into theExecutionStage)ArrowFlightEndpointthat runs in a different machine (currently unaddressed)The proposal in this PR is to find an API that is as consistent as possible for 1) and 2) so that providing a custom codec is a consistent and ergonomic experience no matter if it's for the head stage running locally, or for a worker running an
ArrowFlightEndpoint.For that, just some plain functions are shipped:
providing a codec for an
ArrowFlightEndpointthat will run remotelyproviding a codec for the head of the plan that runs locally
The challenge while keeping the old API (
DistributedPhysicalOptimizerRule::with_codec()), is that it would lead to two different ways necessary for the user to learn to provide their codecs, and makes theExecutionStage.codecfield just a transportation field that adds no semantic meaning to the structure, and is just there to carry the codec around.